package org.example.utils

import com.alibaba.fastjson.{JSON, JSONObject}
import io.searchbox.client.{JestClient, JestResult}
import io.searchbox.core.{Search, SearchResult}
import io.searchbox.indices.settings.{GetSettings, UpdateSettings}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.example.constant.ApolloConst
import org.example.dao.{DrivingContinuedIndex, MysqlConfig}
import org.example.common.Logging
import org.joda.time.DateTime
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
import scalaj.http.{Http, HttpResponse}

import java.io._
import java.net.{HttpURLConnection, URI, URL}
import java.text.SimpleDateFormat
import java.util
import java.util._
import java.util.regex.Pattern
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
 * 通用工具类
 */
object CommonUtils extends Logging {


  /**
   * 获取sparkSession
   *
   * @return
   */
  def getSparkSession(): SparkSession = {
    val session = SparkSession
      .builder()
      //      .master("local[*]")
      //      .appName("datawarehouse")
      .enableHiveSupport()
      .config("hive.exec.dynamic.partition", true) // 支持 Hive 动态分区
      .config("hive.exec.dynamic.partition.mode", "nonstrict") // 非严格模式
      .config("hive.metastore.uris", ApolloConst.hiveMetastore) //"thrift://10.22.17.26:9083"
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("spark.sql.crossJoin.enabled", "true")
      .config("spark.sql.caseSensitive", "true")
      .config("es.nodes", ApolloConst.esNodes)
      .config("es.port", ApolloConst.esPort)
      .config("es.read.field.as.array.include", "true")
      .config("es.read.field.as.array.include", "warnList,processCommentList")
      .config("es.mapping.date.rich", "false")
      .getOrCreate()

    session
  }

  /**
   * 获取本地local测试数据
   * @return
   */
  def getLocalSparkSession():SparkSession = {
    val session = SparkSession
      .builder()
      .master("local[*]")
      .appName("test")
      .getOrCreate()
    session
  }

  //从mysql查询数据
  def selectFromMysql(sparkSession: SparkSession, mysqlConfig: MysqlConfig, sql: String): DataFrame = {
    sparkSession.read
      .format(mysqlConfig.format)
      .option("url", mysqlConfig.url)
      .option("user", mysqlConfig.user)
      .option("password", mysqlConfig.password)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("dbtable", sql)
      .load()
  }

  //创建mysql临时表
  def createTempMysqlTable(sparkSession: SparkSession, mysqlConfig: MysqlConfig, sql: String, tmp_table: String): Unit = {
    selectFromMysql(sparkSession, mysqlConfig, sql).createOrReplaceTempView(tmp_table)
  }
  /**
   * 获取hdfs配置
   *
   * @return
   */
  def getHdfs(): DistributedFileSystem = {
    val hdfs: DistributedFileSystem = new DistributedFileSystem()
    val hdfsConf = new Configuration()
    hdfsConf.set("fs.defaultFS", ApolloConst.FSName)
    hdfsConf.set("dfs.nameservices", "zcbigdata")
    hdfsConf.set("dfs.ha.namenodes.zcbigdata", "nn1,nn2")
    hdfsConf.set("dfs.namenode.rpc-address.zcbigdata.nn1", ApolloConst.nn1Address)
    hdfsConf.set("dfs.namenode.rpc-address.zcbigdata.nn2", ApolloConst.nn2Address)
    hdfsConf.set("dfs.client.failover.proxy.provider.zcbigdata", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
    hdfsConf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true)
    hdfs.initialize(URI.create(ApolloConst.hdfsUrl), hdfsConf)
    hdfs
  }
  /**
   * 获取指定配置文件中的属性
   *
   * @return
   */
  def getProperties(filename: String): Properties = {
    val properties = new Properties()

    properties.load(this.getClass.getClassLoader.getResourceAsStream(filename))

    /*// 指定配置文件路径
    //    val path = Thread.currentThread().getContextClassLoader.getResource(filename).getPath
    val path = "/home/taskConf/" + filename
    // 加载配置文件内容
    properties.load(new FileInputStream(path))*/

    // 返回
    properties
  }

  /**
   * 验证字符串是否是中文
   *
   * @param data
   * @return
   */
  def checkChinese(data: String): Boolean = {
    var result = false
    if (null != data && !"".equals(data.trim)) {
      val pattern = Pattern.compile("^[\u4E00-\u9FFF]+$")
      val matcher = pattern.matcher(data)
      result = matcher.find()
    }
    result
  }

  /**
   * 创建UUID
   *
   * @return
   */
  def createUUID(): String = {
    UUID.randomUUID().toString.replace("-", "")
  }

  /**
   * 保存数据到hive
   *
   * @param dataFrame
   * @param mode
   * @param tableName
   */
  def saveDataToHive(dataFrame: DataFrame, mode: String, partitionday: String, tableName: String): Unit = {
    if (null != partitionday) {
      dataFrame.repartition(1)
        .write
        .mode(mode)
        .format("hive")
        .partitionBy(partitionday)
        .saveAsTable(tableName)
    } else {
      dataFrame
        .write
        .mode(mode)
        .format("hive")
        .saveAsTable(tableName)
    }
  }

  def insertDateIntoTable(dataFrame: DataFrame, mode: String, tableName: String): Unit = {
    dataFrame.repartition(1)
      .write
      .mode(mode)
      .format("hive")
      .insertInto(tableName)
  }

  //计算是否2个日期在30天内
  def yearToYearTheDays(year2: Date): Boolean = {
    //val format2 = new SimpleDateFormat("yyyy-MM-dd")
    val betweenTimes = new Date().getTime - year2.getTime
    val days = (betweenTimes / 1000 / 3600 / 24) - 1
    if (days >= 0 && days <= 30)
      true
    else
      false
  }


  /**
   * 获取指定日期前几天的年月日
   *
   * @param dateString 时间字符串
   * @param beforeDays 几天前
   * @return
   */
  def getBeforeDate(dateString: String, beforeDays: Int, format: String): String = {
    // 将时间字符串转换为时间类型
    val date = new SimpleDateFormat(getStrFormat(dateString)).parse(dateString)
    // 获取指定时间是一年中的第一天
    val calendar = Calendar.getInstance()
    calendar.setTime(date)
    val dayOfYear = calendar.get(Calendar.DAY_OF_YEAR)

    // 获取指定日期几天前的日期
    calendar.set(Calendar.DAY_OF_YEAR, dayOfYear - beforeDays)
    val preDay = new SimpleDateFormat(format).format(calendar.getTime)
    preDay
  }

  /**
   * 获取输入的时间字符串的格式
   *
   * @param dateString 时间字符串
   * @return 时间字符串对应的格式
   */
  private def getStrFormat(dateString: String): String = {
    // 要返回的时间格式
    var format = ""
    // 判断时间字符串格式
    if (dateString.contains("-") && dateString.length == 10) {
      format = "yyyy-MM-dd"
    } else if (dateString.contains("/") && dateString.length == 10) {
      format = "yyyy/MM/dd"
    } else if (dateString.length == 8) {
      format = "yyyyMMdd"
    }
    format
  }

  /**
   * 自动关闭
   *
   * @param acs
   */
  def autoCloseable(acs: AutoCloseable*): Unit = {
    acs.foreach { ac =>
      if (null != ac) {
        ac.close()
      }
    }
  }

  /**
   * 角度换算成弧度
   *
   * @param d 角度
   * @return
   */
  def rad(d: Double): Double = {
    return d * Math.PI / 180.0
  }

  /**
   * 根据经纬度计算两点间的球面距离
   *
   * @param lat1 纬度1
   * @param lng1 经度1
   * @param lat2 纬度2
   * @param lng2 经度2
   * @return
   */
  def getDistance(lat1: Double, lng1: Double, lat2: Double, lng2: Double): Double = {
    var s = 0D
    if (null != lat1 && null != lng1 && null != lat2 && null != lng2) {
      val EARTH_RADIUS = 6371.830D
      val radLat1 = rad(lat1)
      val radLat2 = rad(lat2)
      val radLng1 = rad(lng1)
      val radLng2 = rad(lng2)
      //var s = 2 * Math.asin(Math.sqrt(Math.pow(Math.sin(a / 2), 2) + Math.cos(radLat1) * Math.cos(radLat2) * Math.pow(Math.sin(b / 2), 2)))
      s = Math.acos(Math.cos(radLat1) * Math.cos(radLat2) * Math.cos(radLng1 - radLng2) + Math.sin(radLat1) * Math.sin(radLat2))
      s = s * EARTH_RADIUS
      s = Math.round(s * 1000D) / 1000D
      s
    } else {
      0D
    }
  }

  /**
   * 计算里程数 车辆行驶里程和行驶时间
   *
   * @param len_list
   * @return
   */
  def getAddressLen(len_list: List[String]): List[Seq[String]] = {
    var leg = 0D
    var arr: util.ArrayList[Long] = new util.ArrayList[Long]()
    val daytime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    var li = new util.ArrayList[String]()
    for (x <- 0 to len_list.length - 2) {
      val lonanlat = len_list.get(x).split(";")
      val lonandlat_next = len_list.get(x + 1).split(";")
      if ("null" != lonanlat(3) && "null" != lonandlat_next(1)) {
        if ("null" != lonanlat(1)) {
          li.add(len_list.get(x))
        } else {
          li.add(lonanlat(0) + ";" + lonandlat_next(1) + ";" + lonandlat_next(2) + ";" + lonanlat(3) + ";" + lonanlat(4))
        }
      } else if ("null" != lonanlat(1) && "null" != lonandlat_next(3)) {
        if ("null" != lonanlat(3)) {
          li.add(len_list.get(x))
        } else {
          li.add(lonandlat_next(0) + ";" + lonanlat(1) + ";" + lonanlat(2) + ";" + lonandlat_next(3) + ";" + lonandlat_next(4))
        }
      }
    }
    getDriverLength(li)
  }

  /**
   * 计算行驶里程
   *
   * @param arr
   * @return
   */
  def getDriverLength(arr: util.ArrayList[String]): List[Seq[String]] = {
    val arrs = arr.sortBy(x => x.substring(0, 19))
    var leg = 0D
    var arlis = new util.ArrayList[Long]()
    var arr_li = new util.ArrayList[Tuple4[String, String, Double, Double]]()
    val daytime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    for (x <- 0 to arrs.length - 2) {
      val str = arrs(x).split(";")
      val str1 = arrs(x + 1).split(",")
      val time1 = daytime.parse(str(0)).getTime
      val time2 = daytime.parse(str1(0)).getTime
      val len = getDistance(str(4).toDouble, str(3).toDouble, str1(4).toDouble, str1(3).toDouble)
      if (len != 0 && time1 != time2 && str(2) == str1(2)) {
        val times = time2 - time1
        arr_li.add((str(1), str(2), times, len))
      }
    }
    val tupleToTuple = arr_li.groupBy(x => (x._1, x._2)).mapValues(x => (x.map(x => x._3).reduce(_ + _), x.map(x => x._4).reduce(_ + _)))
    val tuples = tupleToTuple.map(x => Seq(x._1._1, x._1._2, x._2._1.toString, x._2._2.toString))
    tuples.toList
  }

  /**
   * 根据经纬度获取地理位置
   *
   * @param lon
   * @param lat
   * @return
   */
  def interfaceUrl(lon: String, lat: String): String = {
    try {
      val path = "http://10.197.236.100:40612/bcpbase/geocode/regeo"
      val url = new URL(path)
      val conn: HttpURLConnection = url.openConnection().asInstanceOf[HttpURLConnection]
      var out: PrintWriter = null
      //设置通用的请求属性
      conn.setRequestProperty("accept", "*/*")
      // 连接复用
      conn.setRequestProperty("connection", "Keep-Alive")
      conn.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)")
      //设置是否向httpUrlConnection输出，设置是否从httpUrlConnection读入，此外发送post请求必须设置这两个
      //最常用的Http请求无非是get和post，get请求可以获取静态页面，也可以把参数放在URL字串后面，传递给servlet，
      //post与get的 不同之处在于post的参数不是放在URL字串里面，而是放在http请求的正文内。
      conn.setDoOutput(true)
      conn.setDoInput(true)
      //获取URLConnection对象对应的输出流
      out = new PrintWriter(conn.getOutputStream())
      //发送请求参数即数据
      out.print(lat)
      //缓冲数据
      out.flush()
      //获取URLConnection对象对应的输入流
      val is = conn.getInputStream()
      //构造一个字符流缓存
      val br = new BufferedReader(new InputStreamReader(is, "GBK"))
      var str = ""
      var flage = true
      while (flage) {
        val s = br.readLine()
        if (s == null) {
          flage = false
        } else {
          str += s
        }
      }
      //关闭流
      is.close()
      conn.disconnect()
      str
    } catch {
      case e: Exception => e.printStackTrace()
        null
    }
  }

  import java.text.SimpleDateFormat

  /**
   * 两个时间差
   *
   * @param a
   * @param b
   * @return 秒数
   */
  def getBetweenSecond(a: String, b: String): Long = {
    val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    var startDate: Date = null
    var endDate: Date = null
    var ss = 0L
    try {
      startDate = format.parse(a)
      endDate = format.parse(b)
      ss = endDate.getTime - startDate.getTime
    } catch {
      case e: Exception =>
        println("时间格式错误:" + e.printStackTrace())
    }
    Math.abs(ss) / 1000
  }

  /**
   * 根据经纬度调用世纪高通API获取 逆地理信息
   *
   * @param lon
   * @param lat
   * @return
   */
  def getAddressAndLocationCode(lon: Double, lat: Double, detail: Int, road: Int, rowNumber: Int): JSONObject = {
    try {
      val param = scala.collection.immutable.Map("location" -> s"$lon,$lat", "detail" -> s"$detail", "road" -> s"$road", "rowNumber" -> s"$rowNumber")
      val response: HttpResponse[String] = Http(ApolloConst.reverseGeo).params(param).asString
      try {
        JSON.parseObject(response.body)
      } catch {
        case _ =>
          warn(s"JSON parse ERROR: ${response.body}")
          null
      }
    } catch {
      case _ =>
        null
    }
  }

  //根据经纬度获取详细地址，区县代码
  def getLocationCode(lon: Double, lat: Double): (String, String, String) = {
    val locationCode: JSONObject = getAddressAndLocationCode(lon, lat, 1, 0, 0)
    if (locationCode != null && locationCode.getIntValue("errcode") == 0) {
      val data: JSONObject = locationCode.getJSONObject("data")
      val addr: String = data.getString("address")
      val areaCode = data.getJSONObject("dist").getString("code")
      val areaName = data.getJSONObject("dist").getString("value")
      (addr, areaCode, areaName)
    } else {
      (s"逆地理接口调用失败[$lon,$lat]", "100000", "未知区域")
    }

  }

  def isIDNumber(IDNumber: String): Boolean = {
    if (IDNumber == null || "" == IDNumber) return false
    // 定义判别用户身份证号的正则表达式（15位或者18位，最后一位可以为字母）
    val regularExpression = "(^[1-9]\\d{5}(18|19|20)\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{3}[0-9Xx]$)|" + "(^[1-9]\\d{5}\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{3}$)"
    val matches = IDNumber.matches(regularExpression)
    //判断第18位校验值
    if (matches) if (IDNumber.length == 18) try {
      val charArray = IDNumber.toCharArray
      //前十七位加权因子
      val idCardWi = Array(7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2)
      //这是除以11后，可能产生的11位余数对应的验证码
      val idCardY = Array("1", "0", "X", "9", "8", "7", "6", "5", "4", "3", "2")
      var sum = 0
      var i = 0
      while ( {
        i < idCardWi.length
      }) {
        val current = String.valueOf(charArray(i)).toInt
        val count = current * idCardWi(i)
        sum += count

        {
          i += 1;
          i - 1
        }
      }
      val idCardLast = charArray(17)
      val idCardMod = sum % 11
      if (idCardY(idCardMod).toUpperCase == String.valueOf(idCardLast).toUpperCase) return true
      else {
        return false
      }
    } catch {
      case e: Exception =>
        e.printStackTrace()
        return false
    }
    matches
  }

  /**
   * 获取redis过期时间
   *
   * @return
   */
  def getSecondsNextDay(): Int = {
    val cal: Calendar = Calendar.getInstance()
    val now: Long = System.currentTimeMillis()
    val dateFormat: SimpleDateFormat = new SimpleDateFormat("HH")
    val nowHour = dateFormat.format(new Date(now))

    cal.add(Calendar.DAY_OF_YEAR, 1)
    cal.set(Calendar.HOUR_OF_DAY, 1)
    cal.set(Calendar.SECOND, 0)
    cal.set(Calendar.MINUTE, 0)
    cal.set(Calendar.MILLISECOND, 0)
    val time: Long = (cal.getTimeInMillis() - System.currentTimeMillis()) / 1000
    time.toInt
  }

  /**
   * 更新行驶表
   *
   * @param drivingContinued
   */
  def updateDrivingContinued(drivingContinued: DrivingContinuedIndex): Unit = {
    val id = drivingContinued.vehicle_no + "_" + drivingContinued.vehicle_color + "_" + drivingContinued.start_time + "_" + drivingContinued.`type`
    val jestClient = EsUtils.getClient()
    EsUtils.insertIntosEs(jestClient, "driving_continued_index", drivingContinued, id, "_doc")
    jestClient.close()
  }

  /**
   * 随机生成一个全局唯一的20位数字风险编号
   */
  def createRiskNumber(): String = {
    val mills = System.currentTimeMillis()
    var hashCodeV = java.util.UUID.randomUUID.toString.hashCode
    //有可能是负数
    if (hashCodeV < 0) {
      hashCodeV = -hashCodeV;
    }
    mills + "%07d".format(hashCodeV).substring(0, 7)
  }

  /**
   * 获取数据条数
   *
   * @param query
   * @param index
   * @param indexType
   * @param client
   * @return
   */
  def getPageSize(query: String, index: String, indexType: String, client: JestClient): Long = {
    //获取max_result_window值
    val settings = new GetSettings.Builder().addIndex(index).build()
    val result: JestResult = client.execute(settings)
    val jsonObject = result.getJsonObject
    val element = jsonObject.getAsJsonObject(index).getAsJsonObject("settings").getAsJsonObject("index").get("max_result_window")

    //获取数据的条数
    val searchCount: Search = new Search.Builder(query).addIndex(index).addType(indexType).build()
    val resultSetCount: SearchResult = client.execute(searchCount)
    var total = resultSetCount.getTotal
    if (null == total) {
      0
    } else {
      // 防止两个查询中间新生成的报警导致报警数据不全
      total = total + 100
      val querymax =
        s"""
           |{
           | "index" : { "max_result_window" : ${total}}
           |}
          """.stripMargin

      //更新max_result_window值
      if (null != element) {
        val maxResultWindow = element.getAsLong
        if (total > maxResultWindow) {
          val updateSettings = new UpdateSettings.Builder(querymax).addIndex(index).build()
          client.execute(updateSettings)
        }
      } else {
        if (total > resultSetCount.getHits(classOf[util.HashMap[String, String]]).size()) {
          val updateSettings = new UpdateSettings.Builder(querymax).addIndex(index).build()
          client.execute(updateSettings)
        }
      }
      total
    }
  }

  /**
   * 获取指定时间和当前时间差，单位秒
   *
   * @param suppressEndtime
   * @return
   */
  def getTimeDifference(suppressEndtime: String): Int = {
    val format = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
    val endDate = DateTime.parse(suppressEndtime, format)
    val cal: Calendar = Calendar.getInstance()
    val time: Long = (endDate.getMillis - cal.getTimeInMillis()) / 1000
    time.toInt
  }

  /**
   * 获取在周期之内的驾驶员
   *
   * @param driverName
   * @param businessTime
   * @param lastDateTime
   * @param lastTotal
   * @param forPattern
   * @return
   */
  def getCurrDriver(driverName: String, businessTime: String, lastDateTime: String, lastTotal: Int, forPattern: DateTimeFormatter): String = {
    var currDriverName = ""
    if (StringUtils.isNotEmpty(businessTime) && StringUtils.isNotEmpty(lastDateTime)) {
      try {
        val businessMillis = DateTime.parse(businessTime, forPattern).getMillis
        val lastMillis = DateTime.parse(lastDateTime, forPattern).getMillis
        if ((lastMillis + lastTotal * 1000) <= businessMillis) {
          currDriverName = driverName
        }
      } catch {
        case e: Exception => {
          println(s"业务日期格式不对:${businessTime},${lastDateTime}")
        }
      }
    }
    currDriverName
  }

  //去除json字符串中的ASCII码和单引号，制表符，回车
  def rmAscII(string: String): String = {
    val str: String = string.replaceAll("[\\u0000-\\u001f]", "").replace("\\u", "u").replace("\\b", "").replaceAll("\'", "").replaceAll("\t", "").replaceAll("\n", "")
    str
  }

  /**
   * 定位补报
   *
   * @param json 定位补报数据
   * @return
   */
  def addLocation(json: JSONObject): Array[JSONObject] = {
    val gnss = json.getJSONArray("gnssList")
    json.remove("gnssList")
    val map = mutable.HashMap.empty[String, Any]
    json.keySet().asScala.foreach(key => map.put(key, json.get(key)))
    (0 until gnss.size()).map { index =>
      val addJson = gnss.getJSONObject(index)
      addJson.putAll(map.asJava)
      addJson
    }.toArray
  }

  //快速得到next数组
  def getNextQuickly(p: Array[Char]): Array[Int] = {
    //abaabaa
    val lenP = p.length
    var next = new Array[Int](lenP)

    next(0) = 0
    var x = 1 //x指向当前需要求出最大公共前后缀的位置
    var now = 0 //now指向当前可能是最大前缀的最后一个需要比较的位置
    while (x < lenP) {
      //println("x:"+x+" now:"+now)
      if (p(now) == p(x)) { //now==-1是特殊情况，因为我们单独设置了 next[0]=-1   ,如果 next[0]=0,这个条件可以去掉
        now += 1
        next(x) = now
        x += 1

      } else if (now != 0) { //当 当前的 p(now) != p(x)时，x不动，now应该减小
        now = next(now - 1)
      } else { //当now=0时，比到了 p(0) != p(x)，说明当前x是没有公共前后缀的，
        // now不动，下一次应该从p(0)和p(x+1)比较，所以 x+1
        next(x) = 0
        x += 1
      }
    }
    next
  }

  def kmp(s: Array[Char], p: Array[Char], next: Array[Int]): Array[Int] = {
    val array = ArrayBuffer[Int]()
    var i = 0 //主串中将要匹配的位置
    var j = 0 //子串中将要匹配的位置

    while (i < s.length) {
      if (s(i) == p(j)) {
        i += 1
        j += 1
      } else if (j != 0) {
        j = next(j - 1) //在j位置匹配失败  应该找出 p[0]-p[j-1]的最大公共前后缀长度K
      } else {
        i += 1
      }

      if (j == p.length) { //出现了模式串都匹配上的情况
        //println(i-j)
        array.append(i - j)
        j = next(j - 1)
      }
    }
    array.toArray
  }

  def getMatchByKMP(s: String, key: String): Array[Int] = {
    val charArrS = s.toCharArray
    val charArrP = key.toCharArray

    val next = getNextQuickly(charArrP)

    val array = kmp(charArrS, charArrP, next)

    array
  }
}
